CountDownLatch
CountDownLatch是什么
源码注释描述如下:
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
它是一个同步工具类,允许一个或多个线程一直等待,直到其他线程运行完成后再执行。
看下我遇到的代码:
final class DecodeThread extends Thread {
private final MainActivity mActivity;
private final CountDownLatch mHandlerInitLatch;
private Handler mHandler;
DecodeThread(MainActivity activity) {
this.mActivity = activity;
mHandlerInitLatch = new CountDownLatch(1);
}
Handler getHandler() {
try {
mHandlerInitLatch.await();
} catch (InterruptedException ie) {
// continue?
}
return mHandler;
}
@Override
public void run() {
Looper.prepare();
mHandler = new DecodeHandler(mActivity);
mHandlerInitLatch.countDown();
Looper.loop();
}
}
CountDownLatch 工作原理
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量(上面代码构造线程数为1)。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
CountDownLatch的构造函数中的count就是闭锁需要等待的线程数量。这个值只能被设置一次,而且不能重新设置。
主线程必须在启动其他线程后立即调用CountDownLatch.await方法,这样主线程就会在这个方法上阻塞,知道其他线程完成各自任务。
其他线程完成任务后必须各自通知CountDownLatch对象,使其调用countDown方法。当count值为0时,主线程就能通过await方法恢复自己的任务。
简述其执行流程:
- 运行主线程
- 创建N个线程的CountDownLatch
- 创建启动N个线程
- 主线程运行CountDownLatch.await(),等待latch
- N个线程运行完毕,latch计数减到0
- 主线程恢复运行
再看上面的代码,主线程在getHandler处等待latch,在run处创建handler后执行latch.countDown,就是为了在get的时候能拿到非空的handler。
使用场景
- 实现最大的并行性:有时我们想同时启动多个线程,实现最大程度的并行性。例如,我们想测试一个单例类。如果我们创建一个初始计数为1的CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次countDown()方法就可以让所有的等待线程同时恢复执行。
- 开始执行前等待n个线程完成各自任务:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。
- 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。
例子
网上看的一个例子
// 基础checker类
import java.util.concurrent.CountDownLatch;
public abstract class BaseHealthChecker implements Runnable {
private CountDownLatch _latch;
private String _serviceName;
private boolean _serviceUp;
// Get latch object in constructor so that after completing the task, thread
// can countDown() the latch
public BaseHealthChecker(String serviceName, CountDownLatch latch) {
// TODO Auto-generated constructor stub
super();
this._latch = latch;
this._serviceName = serviceName;
this._serviceUp = false;
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
verifyService();
_serviceUp = true;
} catch (Throwable t) {
// TODO: handle exception
t.printStackTrace(System.err);
_serviceUp = false;
} finally {
if (_latch != null) {
_latch.countDown();
}
}
}
public String getServiceName() {
return _serviceName;
}
public boolean isServiceUp() {
return _serviceUp;
}
// This methos needs to be implemented by all specific service checker
public abstract void verifyService();
}
然后创建三个checker,分别是CacheChecker,DatabaseChecker和NetworkChecker
import java.util.concurrent.CountDownLatch;
public class CacheHealthChecker extends BaseHealthChecker {
public CacheHealthChecker (CountDownLatch latch) {
super("Cache Service", latch);
}
@Override
public void verifyService()
{
System.out.println("Checking " + this.getServiceName());
try
{
Thread.sleep(6000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println(this.getServiceName() + " is UP");
}
}
创建一个启动类,处理checker的检测
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class ApplicationStartupUtil {
//List of service checkers
private static List _services;
//This latch will be used to wait on
private static CountDownLatch _latch;
private ApplicationStartupUtil()
{
}
private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
public static ApplicationStartupUtil getInstance()
{
return INSTANCE;
}
public static boolean checkExternalServices() throws Exception
{
//Initialize the latch with number of service checkers
_latch = new CountDownLatch(3);
//All add checker in lists
_services = new ArrayList();
_services.add(new NetworkHealthChecker(_latch));
_services.add(new CacheHealthChecker(_latch));
_services.add(new DatabaseHealthChecker(_latch));
//Start service checkers using executor framework
Executor executor = Executors.newFixedThreadPool(_services.size());
for(final BaseHealthChecker v : _services)
{
executor.execute(v);
}
//Now wait till all services are checked
_latch.await();
//Services are file and now proceed startup
for(final BaseHealthChecker v : _services)
{
if( ! v.isServiceUp())
{
return false;
}
}
return true;
}
}
在主程序中运行检测
public class main {
public static void main(String[] args) {
// TODO Auto-generated method stub
boolean result = false;
try {
result = ApplicationStartupUtil.checkExternalServices();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("External services validation completed !! Result was :: "+ result);
}
}
运行结果:
Checking Network Service
Checking Cache Service
Checking Database Service
Database Service is UP
Cache Service is UP
Network Service is UP
External services validation completed !! Result was :: true
CyclicBarrier
CyclicBarrier 定义
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
也是一个同步工具类,它是让一组线程相互等待进入barrier状态,然后这组线程再执行。
初看定义可能有些懵,看个示例代码就清楚了。
示例
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i
示例中创建了监控4个线程的CyclicBarrier(4),然后启动4个线程,每个线程都持有cyclicBarrier对象。
在每个线程执行一半时,运行cyclicBarrier.await(),此时CyclicBarrier就会进行+1计数,当前线程被阻塞。当计数达到4时,解除阻塞,所有线程都继续执行。
所以运行结果是:
线程Thread-0正在写入数据…
线程Thread-3正在写入数据…
线程Thread-2正在写入数据…
线程Thread-1正在写入数据…
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务…
所有线程写入完毕,继续处理其他任务…
所有线程写入完毕,继续处理其他任务…
所有线程写入完毕,继续处理其他任务…
要点
- 构造方法有两种
CyclicBarrier(int parties)
默认构造方法,参数表示拦截的线程数量。
CyclicBarrier(int parties, Runnable barrierAction)
由于线程之前的调度是由CPU决定的,所以默认的构造方法无法设置线程执行优先级,CyclicBarrier提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction)
,用于在线程到达同步点时,优先执行线程barrierAction,这样可以更加方便的处理一些负责的业务场景。
- await实现
CyclicBarrier同样提供带超时时间的await和不带超时时间的await。如果指定了时间,在时间内某个线程还未await,就抛出异常,所有线程继续执行后续任务。
- reset功能
reset可以使其不断地复用
两者区别
CountDownLatch | CyclicBarrier |
---|---|
减计数方式 | 加计数方式 |
计算为0时释放所有等待的线程 | 计数达到指定值时释放所有等待线程 |
计数为0时,无法重置 | 计数达到指定值时,计数置为0重新开始 |
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 | 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞 |
不可重复利用 | 可重复利用 |